package org.databandtech.streamjob.jobs;

import java.math.BigDecimal;
import java.util.Properties;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.databandtech.streamjob.sink.SinkToMySQL;

public class KafkaToMysqlJob implements Runnable {
	
	final static String READ_TOPIC = "Hello-Kafka";
	final static String URL = "jdbc:mysql://localhost:3307/flink?useUnicode=true&characterEncoding=utf-8&useSSL=false";
	final static String USER = "root";
	final static String PASS = "mysql";
	final static String SQLSINK = "insert into databand_usermoney(username,age,bonus)values(?,?,?);";

	@Override
	public void run() {

		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.enableCheckpointing(5000); // 每隔 5000 毫秒 执行一次 checkpoint
		FlinkKafkaConsumer<String> kafkaConsumer = readStreamData();
		
		sinkData(env, kafkaConsumer);
	}

	private void sinkData(StreamExecutionEnvironment env, FlinkKafkaConsumer<String> kafkaConsumer) {
		DataStream<String> streamInput = env
				  .addSource(kafkaConsumer);
		// 数据写入		
		DataStreamSink<String> dataWriteStream = streamInput.addSink(new SinkToMySQL(URL, USER, PASS, SQLSINK));
		dataWriteStream.setParallelism(2);//2个并行

		try {
			env.execute("ok-mysql");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	private FlinkKafkaConsumer<String> readStreamData() {
		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", "192.168.10.60:9092");
		properties.setProperty("group.id", "test");
		properties.setProperty("stream.parallelism", "4");
		// 数据读取
		FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(READ_TOPIC, new SimpleStringSchema(),
				properties);

		// 从最早的记录开始,全量采集
		kafkaConsumer.setStartFromEarliest();
		// kafkaConsumer.setStartFromLatest(); // 从最新的记录开始
		// kafkaConsumer.setStartFromTimestamp(startupOffsetsTimestamp); // 从指定的时间开始（毫秒）
		// kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
		// 从指定分区的位置开始
		// Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
		// specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
		// specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
		// specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
		// kafkaConsumer.setStartFromSpecificOffsets(specificStartOffsets);

		// 默认的方法
		// kafkaConsumer.setStartFromGroupOffsets();

		return kafkaConsumer;
	}

	public static class MapTransformation
			implements MapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, BigDecimal>> {

		private static final long serialVersionUID = 1L;

		@Override
		public Tuple3<String, Integer, BigDecimal> map(Tuple2<String, Integer> in) throws Exception {
			Tuple3<String, Integer, BigDecimal> result = new Tuple3<String, Integer, BigDecimal>();
			result.setField(in.f0, 0);
			result.setField(in.f1, 1);

			BigDecimal money = BigDecimal.valueOf(0);
			if (in.f1 < 30)
				money = BigDecimal.valueOf(1000);
			else if (in.f1 >= 30 && in.f1 < 60)
				money = BigDecimal.valueOf(2000);
			else
				money = BigDecimal.valueOf(3000);

			result.setField(money, 2);

			return null;
		}
	}

}
